package com.zhoujing.rabbltmq.publish;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.zhoujing.rabbltmq.utils.RabbitMqUtils;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;

/**
 * @author 周敬
 * @version 1.0
 * @createTime 2022/7/10-21:13-星期日
 * <p>
 * 发布确认
 * 单个发布
 */
public class ConfirmMessage {

    /**
     * 队列名称
     */
    public static final String QUEUE_NAME = UUID.randomUUID().toString();

    /**
     * 批量发送消息的数量
     */
    public static final Integer MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        // 单个确认  耗时秒为：98
        // singleConfirm();
        // 批量确认 耗时为：1
        // batchConfirm();
        // 异步确认 耗时为：0
        publishMessageAsync();  
    }


    /**
     * 单个确认
     *
     * @throws IOException
     * @throws TimeoutException
     * @throws InterruptedException
     */
    public static void singleConfirm() throws IOException, TimeoutException, InterruptedException {

        Channel channel = RabbitMqUtils.getChannel();
        // 开启发布确认
        channel.confirmSelect();
        // 队列声明
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 记录开始时间
        long startTime = System.currentTimeMillis();

        for (Integer i = 0; i < MESSAGE_COUNT; i++) {
            channel.basicPublish("", QUEUE_NAME, null, i.toString().getBytes());
            // 消息发送完立即消息确认
            boolean flag = channel.waitForConfirms();
            if (!flag) {
                System.out.println("消息发送失败");
            }
        }
        // 结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("耗时秒为：" + (endTime - startTime) / 1000);

    }

    /**
     * 批量确认
     * 每循环100次进行发布确认
     * @throws IOException
     * @throws TimeoutException
     * @throws InterruptedException
     */
    public static void batchConfirm() throws IOException, TimeoutException, InterruptedException {
        Channel channel = RabbitMqUtils.getChannel();
        // 开启发布确认
        channel.confirmSelect();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 记录开始时间
        long startTime = System.currentTimeMillis();
        // 每100次进行消息确认
        int batchLength = 100;
        for (Integer i = 0; i < MESSAGE_COUNT; i++) {
            channel.basicPublish("", QUEUE_NAME, null, i.toString().getBytes());
            if (i%batchLength == 0) {
                boolean flag = channel.waitForConfirms();
            }
        }
        // 结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("耗时秒为：" + (endTime - startTime) / 1000);
    }

    /**
     * 异步确认
     * @throws IOException
     * @throws TimeoutException
     */
    public static void publishMessageAsync() throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        // 开启发布确认
        channel.confirmSelect();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        /*
         *  线程安全有序的一个跳表，适用于高并发的情况下
         *  1、轻松的将序号与消息进行关联
         *  2、轻松批量删除条目，只要给到序号
         *  3、支持高并发（多线程）
         */
        ConcurrentNavigableMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();

        // 监听成功回调
        ConfirmCallback ackCallback = (tag,multiple)->{
            // 如果是批量发消息就将其全部删除掉
            if (multiple) {
                // 2、总消息减去发送成功的消息剩下来就是发送失败的消息
                ConcurrentNavigableMap<Long, String> headMap = outstandingConfirms.headMap(tag,true);
                headMap.clear();
            }else{
                // 删除单个
                outstandingConfirms.remove(tag);
            }

            System.out.println("确认的消息："+tag);

        };
        // 监听失败的回调
        ConfirmCallback nackCallback = (tag,multiple)->{
            System.out.println("未确认的消息：" + tag);
        };

        /*
        * 准备消息监听器，监听哪些消息成功了，哪些消息失败了
        * 参数1：成功回调
        * 参数2：失败回调
        * */
        channel.addConfirmListener(ackCallback,nackCallback);
        // 记录开始时间
        long startTime = System.currentTimeMillis();
        // 批量发送消息
        for (Integer i = 0; i < MESSAGE_COUNT; i++) {
            String message = i.toString();
            // 1、记录所有要发的消息，消息的总和
            outstandingConfirms.put(channel.getNextPublishSeqNo()-1,message);
            channel.basicPublish("",QUEUE_NAME,null,i.toString().getBytes());
        }
        // 结束时间
        long endTime = System.currentTimeMillis();
        System.err.println("耗时秒为：" + (endTime - startTime) / 1000);
    }
}
